Antonio Coín Castro
October 2021
from matplotlib import pyplot as plt
import arviz as az
import numpy as np
import pandas as pd
import pickle
import scipy
from multiprocessing import Pool
import utils
# Configuration
# Extensions
%load_ext autoreload
%autoreload 2
# Plotting configuration
%matplotlib inline
%config InlineBackend.figure_format = 'retina'
plt.rcParams['figure.figsize'] = [6, 4]
plt.style.use('arviz-darkgrid')
NCOLS = 3
def NROWS(x, ncols=NCOLS):
return np.ceil(x/ncols).astype('int')
# Randomness and reproducibility
SEED = 42
np.random.seed(SEED)
rng = np.random.default_rng(SEED)
# Floating point precision for display
np.set_printoptions(precision=3, suppress=True)
pd.set_option("display.precision", 3)
# Multiprocessing
N_CORES = 4
We consider the model
$$ Y_i = \mu + \Psi^{-1}_{X_i}(\alpha) + \varepsilon_i, $$i.e.,
$$ Y_i \sim \mathcal N\left(\mu + \sum_{j=1}^p \beta_jX_i(t_j), \ \sigma^2\right). $$The prior distributions we choose are:
\begin{align*} \pi(\mu, \sigma^2) & \propto 1/\sigma^2, \\ \tau & \sim \mathscr U([0, 1]^p), \\ \beta\mid \tau, \sigma^2 & \sim \mathcal N(b_0, g\sigma^2(\mathcal X_\tau' \mathcal X_\tau + \eta I)^{-1}), \end{align*}Writing the parameter vector as $\theta = (\beta, \tau, \mu, \sigma^2)$, the joint posterior probability is:
$$ \pi(\beta, \tau, \mu, \sigma^2\mid Y) \propto \frac{|G_\tau|^{1/2}}{\sigma^{p+n+2}} \exp\left\{ -\frac{1}{2\sigma^2} \left(\|Y- \mu\boldsymbol{1} - \mathcal X_\tau\beta\|^2 + \frac{1}{g}(\beta - b_0)'G_\tau(\beta - b_0) \right) \right\}. $$Hence, the log-posterior probability is:
$$ \log \pi(\beta, \tau, \mu, \sigma^2\mid Y) \propto \frac{1}{2}\log |G_\tau| - (p+n+2)\log \sigma -\frac{1}{2\sigma^2} \left(\|Y-\mu\boldsymbol{1} - \mathcal X_\tau\beta\|^2 + \frac{1}{g}(\beta - b_0)'G_\tau(\beta - b_0) \right). $$Note that for computational reasons we will work with $\log \sigma$ instead of $\sigma^2$, and hence the associated prior distribution is
$$ \pi(\mu, \log\sigma) \propto 1. $$We generate a toy dataset with $n=100$ functional regressors $X_i(t) \sim BM$, a response variable given by a "simple" RKHS function, a value of $\mu=5$ and a variance of $\sigma^2=1$:
$$ Y_i \sim \mathcal N\big(5 -5X_i(0.1) + 10X_i(0.8), \ 1\big). $$We consider a regular grid of $N=100$ points on $[0, 1]$. In addition, we center the $X_i$ so that they have zero mean when fed to the sampling algorithms.
def brownian_kernel(s, t, sigma=1.0):
return sigma*np.minimum(s, t)
n, N = 100, 100
grid = np.linspace(0., 1., N + 1)
beta_true = np.array([-5., 10.])
tau_true = np.array([0.1, 0.8])
mu_true = 5
sigma2_true = 1.0
theta_names = ["beta", "tau", "mu", "sigma2"]
X, Y = utils.generate_gp_dataset(
rng, grid, brownian_kernel, n,
beta_true, tau_true, mu_true, sigma2_true
)
# Standardize data
X_m = X.mean(axis=0)
X = X - X_m
utils.plot_dataset(X, Y, grid)
We will try to recover the model using $p=3$.
p_hat = 3
theta_ndim = 2*p_hat + 2
g = 5
eta = 0.1
sd_beta_init = 5
sd_mu_init = 10*np.abs(Y.mean()) # Grollemund et al (?)
sd_log_sigma_init = 1
# Labels
labels = []
for i in range(p_hat):
labels.append(fr"$\beta_{i + 1}$")
for i in range(p_hat):
labels.append(fr"$t_{i + 1}$")
labels.append(r"$\mu$")
labels.append(r"$\sigma^2$")
def initial_guess_random(p, sd_beta, sd_mu, sd_log_sigma, n_walkers=1):
beta_init = sd_beta*rng.standard_normal(size=(n_walkers, p))
tau_init = rng.uniform(size=(n_walkers, p))
mu_init = sd_mu*rng.standard_normal(size=(n_walkers, 1))
log_sigma_init = sd_log_sigma*rng.standard_normal(size=(n_walkers, 1))
init = np.hstack((
beta_init,
tau_init,
mu_init,
log_sigma_init
))
return init if n_walkers > 1 else init[0]
def intial_guess_around_value(
value, p, sd_beta=1, sd_tau=0.1,
sd_mu=1, sd_log_sigma=0.5, n_walkers=1):
beta_jitter = sd_beta*rng.standard_normal(size=(n_walkers, p))
tau_jitter = sd_tau*rng.standard_normal(size=(n_walkers, p))
mu_jitter = sd_mu*rng.standard_normal(size=(n_walkers, 1))
log_sigma_jitter = sd_log_sigma*rng.standard_normal(size=(n_walkers, 1))
jitter = np.hstack((
beta_jitter,
tau_jitter,
mu_jitter,
log_sigma_jitter
))
init_jitter = value[np.newaxis, :] + jitter
# Restrict tau to [0, 1]
init_jitter[:, p:2*p] = np.clip(init_jitter[:, p:2*p], 0.0, 1.0)
return init_jitter if n_walkers > 1 else init_jitter[0]
def neg_ll(theta, X, Y, grid, p):
n, _ = X.shape
beta = theta[:p]
tau = theta[p:2*p]
mus = theta[-2]*np.ones(n)
log_sigma = theta[-1]
sigma = np.exp(log_sigma)
idx = np.abs(grid - tau[:, np.newaxis]).argmin(1)
X_tau = X[:, idx]
return -(-n*log_sigma
- np.linalg.norm(Y - mus - X_tau@beta)**2/(2*sigma**2))
# We artificially constrain the variance to a sensible value of sigma <= 20
bounds = [(None, None)]*p_hat + [(0.0, 1.0)] * \
p_hat + [(None, None)] + [(None, 3.0)]
mle = scipy.optimize.minimize(
neg_ll,
x0=initial_guess_random(p_hat, sd_beta_init,
sd_mu_init, sd_log_sigma_init),
args=(X, Y, grid, p_hat),
bounds=bounds,
)
mle_theta = mle.x
mle_orig = np.copy(mle_theta)
mle_orig[-1] = np.exp(mle_theta[-1])**2 # Transform back to sigma^2
pd.DataFrame(zip(labels, mle_orig),
columns=["", "MLE"]).style.hide_index()
import emcee
We only need to provide the sampler with the logarithm of the posterior distribution. For clarity we split up its computation in log-prior and log-likelihood, although for a more efficient implementation it should all be in one function.
def log_prior_and_sample(theta):
"""Global parameters (for efficient parallelization): X, b0, g, eta, grid"""
n = X.shape[0]
beta = theta[:p_hat]
tau = theta[p_hat:2*p_hat]
mu = theta[-2]
log_sigma = theta[-1]
# Transform variables
b = beta - b0
sigma = np.exp(log_sigma)
# Impose constraints on parameters
if (tau < 0.0).any() or (tau > 1.0).any():
return -np.inf, np.full(n, -np.inf)
# Compute and regularize G_tau
idx = np.abs(grid - tau[:, np.newaxis]).argmin(1)
X_tau = X[:, idx]
G_tau = X_tau.T@X_tau
G_tau = (G_tau + G_tau.T)/2. # Enforce symmetry
G_tau_reg = G_tau + eta * \
np.max(np.linalg.eigvalsh(G_tau))*np.identity(p_hat)
# Compute log-prior
log_prior = (0.5*utils.logdet(G_tau_reg)
- (p_hat + 2)*log_sigma
- b.T@G_tau_reg@b/(2*g*sigma**2))
# Compute posterior predictive sample
posterior_predictive_sample = mu + X_tau@beta + \
sigma*rng.standard_normal(size=n)
return log_prior, posterior_predictive_sample
def log_likelihood(theta, Y):
"""Global parameters (for efficient parallelization): X, grid"""
return -neg_ll(theta, X, Y, grid, p_hat)
def log_posterior(theta, Y):
lp, pps = log_prior_and_sample(theta)
ll = log_likelihood(theta, Y)
lpos = lp + ll
return lpos, pps
We set up the initial points of the chains to be in a random neighbourhood around the MLE to increase the speed of convergence.
n_walkers = 100
n_iter_initial = 500
n_iter = 1000
# Start every walker in a (random) neighbourhood around the MLE
p0 = intial_guess_around_value(mle_theta, p_hat, n_walkers=n_walkers)
b0 = mle_theta[:p_hat] # <-- Change if needed
with Pool(N_CORES) as pool:
print(
f"-- Running affine-invariant ensemble sampler with {N_CORES} cores --")
sampler = emcee.EnsembleSampler(
n_walkers, theta_ndim, log_posterior, pool=pool, args=(Y,))
print("Tuning phase...")
state = sampler.run_mcmc(
p0, n_iter_initial, progress='notebook',
store=False)
sampler.reset()
print("MCMC sampling...")
sampler.run_mcmc(state, n_iter, progress='notebook')
We analyze the samples of all chains, discarding a few times the integrated autocorrelation times worth of samples. We could also perform thinning and take only every $k$-th value.
# Analyze autocorrelation and perform burn-in and thinning
autocorr = sampler.get_autocorr_time(quiet=True)
max_autocorr = np.max(autocorr)
burn = int(3*max_autocorr)
thin = 1
# Get trace of samples
trace = np.copy(sampler.get_chain(discard=burn, thin=thin))
trace[:, :, -1] = np.exp(trace[:, :, -1])**2 # Recover sigma^2
trace_flat = trace.reshape(-1, trace.shape[-1]) # All chains combined
# Get InferenceData object
idata_emcee = az.from_emcee(
sampler,
var_names=labels,
blob_names=["y_obs"],
arg_names=["y_obs"],
blob_groups=["posterior_predictive"])
idata_emcee = idata_emcee.sel(draw=slice(burn, None, thin))
idata_emcee["posterior"][labels[-1]] = \
np.exp(idata_emcee["posterior"][labels[-1]])**2 # Recover sigma^2
# Sampler statistics
autocorr_thin = sampler.get_autocorr_time(discard=burn, thin=thin, quiet=True)
print(
f"Mean acceptance fraction: {100*np.mean(sampler.acceptance_fraction):.3f}%")
pd.DataFrame(
zip(labels, autocorr_thin, len(trace_flat)/autocorr_thin),
columns=["", "Autocorrelation times", "Effective i.i.d samples"]
).style.hide_index()
utils.summary(idata_emcee, labels, kind="stats")
utils.plot_evolution(trace, labels) # az.plot_trace(idata_emcee)
az.plot_posterior(idata_emcee, var_names=labels, point_estimate='mode',
grid=(NROWS(theta_ndim), NCOLS), textsize=20)
print("Marginal posterior distributions:")
utils.plot_ppc(idata_emcee, y_obs=Y, n_samples=500, var_names=["y_obs"])
az.plot_autocorr(idata_emcee, combined=True, grid=(NROWS(theta_ndim), NCOLS))
print("Combined autocorrelation times:")
This is only for testing purposes; in a production environment one should use the Backends feature of emcee.
with open("emcee-p-fixed.idata", 'wb') as file:
pickle.dump(idata_emcee, file)
with open("emcee-p-fixed.idata", 'rb') as file:
idata_emcee = pickle.load(file)
trace = idata_emcee.posterior.to_array().to_numpy().T
trace_flat = trace.reshape(-1, trace.shape[-1]) # All chains combined
import pymc3 as pm
import theano
import theano.tensor as tt
# Labels for pymc
labeller = az.labels.MapLabeller(
var_name_map={
"beta": r"$\beta$",
"tau": r"$\tau$",
"mu": r"$\mu$",
"sigma2": r"$\sigma^2$"}
)
We set the value of $b_0$ as the MLE for $\beta$.
def make_model(p, g, eta, X, Y, grid, init_MLE=True):
if init_MLE:
b0 = mle_theta[:p]
else:
b0 = g*rng.standard_normal(size=p) # <-- Change if needed
X_t = theano.shared(X) # Transform X into Theano tensor
with pm.Model() as model:
mu_and_log_sigma = pm.DensityDist(
'mu_and_log_sigma', lambda x: 0, shape=(2,))
mu = pm.Deterministic('mu', mu_and_log_sigma[0])
mus = mu*np.ones_like(Y)
log_sigma = mu_and_log_sigma[1]
sigma = pm.math.exp(log_sigma)
sigma2 = pm.Deterministic('sigma2', sigma**2)
tau = pm.Uniform('tau', 0.0, 1.0, shape=(p,))
idx = np.abs(grid - tau[:, np.newaxis]).argmin(1)
X_tau = X_t[:, idx]
G_tau = pm.math.matrix_dot(X_tau.T, X_tau)
G_tau = (G_tau + G_tau.T)/2. # Enforce symmetry
G_tau_reg = G_tau + eta * \
tt.max(tt.nlinalg.eigh(G_tau)[0])*np.identity(p)
def beta_lprior(x):
b = x - b0
return (0.5*pm.math.logdet(G_tau_reg)
- p*log_sigma
- pm.math.matrix_dot(b.T, G_tau_reg, b)/(2.*g*sigma2))
beta = pm.DensityDist('beta', beta_lprior, shape=(p,))
expected_obs = mus + pm.math.matrix_dot(X_tau, beta)
y_obs = pm.Normal('y_obs', mu=expected_obs, sigma=sigma, observed=Y)
return model
# NUTS with MLE as starting point
model_MLE = make_model(p_hat, g, eta, X, Y, grid)
with model_MLE:
print("Starting from MLE...")
ttr = model_MLE.named_vars['tau'].transformation
start = {"beta": mle_theta[:p_hat],
"tau_interval__": ttr.forward(mle_theta[p_hat:2*p_hat]).eval(),
"mu_and_log_sigma": mle_theta[-2:]}
idata_mle = pm.sample(1000, cores=2, tune=1000, start=start,
target_accept=0.8, return_inferencedata=True)
utils.summary(idata_mle, theta_names)
# NUTS with MAP as starting point
model_MAP = make_model(p_hat, g, eta, X, Y, grid)
with model_MAP:
print("Computing MAP estimate...")
start = pm.find_MAP()
print("Starting from MAP estimate...")
idata_map = pm.sample(1000, cores=2, tune=1000, start=start,
target_accept=0.8, return_inferencedata=True)
utils.summary(idata_map, theta_names)
# NUTS with auto initialization
model_auto = make_model(p_hat, g, eta, X, Y, grid)
with model_auto:
idata_auto = pm.sample(1000, cores=2, tune=1000, target_accept=0.8,
return_inferencedata=True)
utils.summary(idata_auto, theta_names)
Since the tuning iterations already serve as burn-in, we keep the whole trace. In addition, we could consider thinning the samples.
# Select and print best model
burn = 0
thin = 1
model = model_auto
idata_pymc = idata_auto
idata_pymc = idata_pymc.sel(draw=slice(burn, None, thin))
print("Graphical model:")
pm.model_graph.model_to_graphviz(model)
az.plot_trace(idata_pymc, var_names=theta_names, labeller=labeller)
print("Trace plot:")
az.plot_posterior(
idata_pymc, point_estimate='mode',
var_names=theta_names,
labeller=labeller,
textsize=20,
grid=(NROWS(theta_ndim), NCOLS))
print("Marginal posterior distributions:")
with model:
print("Generating posterior predictive samples...")
ppc = pm.sample_posterior_predictive(idata_pymc, var_names=["y_obs"])
utils.plot_ppc(
az.from_pymc3(posterior_predictive=ppc, model=model),
y_obs=Y, n_samples=500, var_names=["y_obs"])
az.plot_autocorr(idata_pymc, var_names=theta_names,
combined=True, grid=(NROWS(theta_ndim), NCOLS))
print("Combined autocorrelation times:")
_ = idata_pymc.to_netcdf("pymc-p-fixed.nc")
idata_pymc = az.from_netcdf("pymc-p-fixed.nc")
az.plot_pairscorner.cornerMetropolis (+ MLE, MAP)DeMetropolis(Z)Variational Inference in PyMC3Some differences in the new release (see RELEASE-NOTES):
rng_seeder argument to set the random seed.%load_ext watermark
%watermark -n -u -v -iv -w